/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.oozie.action.hadoop;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.command.wf.WorkflowXCommand;
import org.apache.oozie.dependency.FSURIHandler;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UserGroupInformationService;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;

/**
 * File system action executor. <p> This executes the file system mkdir, move and delete commands
 */
public class FsActionExecutor extends ActionExecutor {

    public static final String ACTION_TYPE = "fs";

    public static final String FS_OP_MKDIR = "mkdir";
    public static final String FS_OP_DELETE = "delete";
    public static final String FS_OP_MOVE = "move";
    public static final String FS_OP_CHMOD = "chmod";
    public static final String FS_OP_TOUCHZ = "touchz";
    public static final String FS_OP_CHGRP = "chgrp";
    public static final String FS_OP_SETREP = "setrep";

    public static final String FS_TAG_PATH = "path";
    public static final String FS_TAG_SOURCE = "source";
    public static final String FS_TAG_TARGET = "target";
    public static final String FS_TAG_RECURSIVE = "recursive";
    public static final String FS_TAG_DIRFILES = "dir-files";
    public static final String FS_TAG_SKIPTRASH = "skip-trash";
    public static final String FS_TAG_PERMISSIONS = "permissions";

    private final int maxGlobCount;

    private final XLog LOG = XLog.getLog(getClass());

    public FsActionExecutor() {
        super(ACTION_TYPE);
        maxGlobCount = ConfigurationService.getInt(LauncherAMUtils.CONF_OOZIE_ACTION_FS_GLOB_MAX);
    }

    /**
    * Initialize Action.
    */
    @Override
    public void initActionType() {
        super.initActionType();
        registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.ERROR, "FS014");
    }

    Path getPath(Element element, String attribute) {
        String str = element.getAttributeValue(attribute).trim();
        return new Path(str);
    }

    void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
        try {
            String scheme = path.toUri().getScheme();
            if (withScheme) {
                if (scheme == null) {
                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
                                                      "Missing scheme in path [{0}]", path);
                }
                else {
                    Services.get().get(HadoopAccessorService.class).checkSupportedFilesystem(path.toUri());
                }
            }
            else {
                if (scheme != null) {
                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
                                                      "Scheme [{0}] not allowed in path [{1}]", scheme, path);
                }
            }
        }
        catch (HadoopAccessorException hex) {
            throw convertException(hex);
        }
    }

    Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException {
        Path fullPath;

        // If no nameNode is given, validate the path as-is and return it as-is
        if (nameNode == null) {
            validatePath(path, withScheme);
            fullPath = path;
        } else {
            // If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier
            String pathScheme = path.toUri().getScheme();
            String pathAuthority = path.toUri().getAuthority();
            if (pathScheme == null || pathAuthority == null) {
                if (path.isAbsolute()) {
                    String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority();
                    fullPath = new Path(nameNodeSchemeAuthority + path.toString());
                } else {
                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011",
                            "Path [{0}] cannot be relative", path);
                }
            } else {
                // If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is
                // If it is the nameNode, then it should have already been verified earlier so return it as-is
                if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) {
                    validatePath(path, withScheme);
                }
                fullPath = path;
            }
        }
        return fullPath;
    }

    void validateSameNN(Path source, Path dest) throws ActionExecutorException {
        Path destPath = new Path(source, dest);
        String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
        String s = source.toUri().getScheme() + source.toUri().getAuthority();

        //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
        if(!t.equals(s)) {
            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
                    "move, target NN URI different from that of source", dest);
        }
    }

    @SuppressWarnings("unchecked")
    void doOperations(Context context, Element element) throws ActionExecutorException {
        try {
            FileSystem fs = context.getAppFileSystem();
            boolean recovery = fs.exists(getRecoveryPath(context));
            if (!recovery) {
                fs.mkdirs(getRecoveryPath(context));
            }

            Path nameNodePath = null;
            Element nameNodeElement = element.getChild("name-node", element.getNamespace());
            if (nameNodeElement != null) {
                String nameNode = nameNodeElement.getTextTrim();
                if (nameNode != null) {
                    nameNodePath = new Path(nameNode);
                    // Verify the name node now
                    validatePath(nameNodePath, true);
                }
            }

            XConfiguration fsConf = new XConfiguration();
            Path appPath = new Path(context.getWorkflow().getAppPath());
            // app path could be a file
            if (fs.isFile(appPath)) {
                appPath = appPath.getParent();
            }
            JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf);

            for (Element commandElement : (List<Element>) element.getChildren()) {
                String command = commandElement.getName();
                switch (command) {
                    case FS_OP_MKDIR:
                        doMkdirOperation(context, fsConf, nameNodePath, commandElement);
                        break;
                    case FS_OP_DELETE:
                        doDeleteOperation(context, fsConf, nameNodePath, commandElement);
                        break;
                    case FS_OP_MOVE:
                        doMoveOperation(context, fsConf, nameNodePath, commandElement, recovery);
                        break;
                    case FS_OP_CHMOD:
                        doChmodOperation(context, fsConf, nameNodePath, commandElement);
                        break;
                    case FS_OP_TOUCHZ:
                        doTouchzOperation(context, fsConf, nameNodePath, commandElement);
                        break;
                    case FS_OP_CHGRP:
                        doChgrpOperation(context, fsConf, nameNodePath, commandElement);
                        break;
                    case FS_OP_SETREP:
                        doSetrepOperation(context, commandElement);
                        break;
                    default:
                        LOG.warn("Fs Action don't support [{0}] operation for the time being ", command);
                }
            }
        } catch (Exception ex) {
            throw convertException(ex);
        }
    }

    private void doMkdirOperation(Context context,  XConfiguration fsConf, Path nameNodePath,
                                  Element commandElement) throws ActionExecutorException {
        Path path = getPath(commandElement, FS_TAG_PATH);
        mkdir(context, fsConf, nameNodePath, path);
    }

    private void doDeleteOperation(Context context, XConfiguration fsConf, Path nameNodePath,
                                   Element commandElement) throws ActionExecutorException {
        Path path = getPath(commandElement, FS_TAG_PATH);
        boolean skipTrash = true;
        String skipTrashTag = commandElement.getAttributeValue(FS_TAG_SKIPTRASH);
        if (skipTrashTag != null && skipTrashTag.equals("false")) {
            skipTrash = false;
        }
        delete(context, fsConf, nameNodePath, path, skipTrash);
    }

    private void doMoveOperation(Context context, XConfiguration fsConf, Path nameNodePath,
                                 Element commandElement, boolean recovery) throws ActionExecutorException {
        Path source = getPath(commandElement, FS_TAG_SOURCE);
        Path target = getPath(commandElement, FS_TAG_TARGET);
        move(context, fsConf, nameNodePath, source, target, recovery);
    }

    private void doChmodOperation(Context context, XConfiguration fsConf, Path nameNodePath,
                                  Element commandElement) throws ActionExecutorException {
        Path path = getPath(commandElement, FS_TAG_PATH);
        String permissionsTag = commandElement.getAttributeValue(FS_TAG_PERMISSIONS).trim();
        String dirFilesTag = commandElement.getAttributeValue(FS_TAG_DIRFILES);
        boolean dirFiles = (dirFilesTag == null) || Boolean.parseBoolean(dirFilesTag);
        boolean recursive = commandElement.getChild(FS_TAG_RECURSIVE, commandElement.getNamespace()) != null;
        chmod(context, fsConf, nameNodePath, path, permissionsTag, dirFiles, recursive);
    }

    private void doTouchzOperation(Context context, XConfiguration fsConf, Path nameNodePath,
                                   Element commandElement) throws ActionExecutorException {
        Path path = getPath(commandElement, FS_TAG_PATH);
        touchz(context, fsConf, nameNodePath, path);
    }

    private void doChgrpOperation(Context context, XConfiguration fsConf, Path nameNodePath,
                                  Element commandElement) throws ActionExecutorException {
        Path path = getPath(commandElement, FS_TAG_PATH);
        String groupTag = commandElement.getAttributeValue("group");
        String dirFilesTag = commandElement.getAttributeValue(FS_TAG_DIRFILES);
        boolean dirFiles = (dirFilesTag == null) || Boolean.parseBoolean(dirFilesTag);
        boolean recursive = commandElement.getChild(FS_TAG_RECURSIVE, commandElement.getNamespace()) != null;
        chgrp(context, fsConf, nameNodePath, path, context.getWorkflow().getUser(),
                groupTag, dirFiles, recursive);
    }

    private void doSetrepOperation(Context context, Element commandElement)
            throws ActionExecutorException, HadoopAccessorException {
        Path path = getPath(commandElement, FS_TAG_PATH);
        String replicationFactor = commandElement.getAttributeValue("replication-factor");
        if (replicationFactor != null) {
            setrep(context, path, Short.parseShort(replicationFactor));
        }
    }

    void chgrp(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String user, String group,
            boolean dirFiles, boolean recursive) throws ActionExecutorException {

        LOG.info("Setting ownership for [{0}] to group: [{1}], user: [{2}]. Recursive mode: [{3}]", path, group, user, recursive);
        HashMap<String, String> argsMap = new HashMap<String, String>();
        argsMap.put("user", user);
        argsMap.put("group", group);
        try {
            FileSystem fs = getFileSystemFor(path, context, fsConf);
            path = resolveToFullPath(nameNodePath, path, true);
            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
            if (pathArr == null || pathArr.length == 0) {
                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", "chgrp"
                        + ", path(s) that matches [{0}] does not exist", path);
            }
            checkGlobMax(pathArr);
            for (Path p : pathArr) {
                recursiveFsOperation("chgrp", fs, nameNodePath, p, argsMap, dirFiles, recursive, true);
            }
        }
        catch (Exception ex) {
            throw convertException(ex);
        }
    }

    private void recursiveFsOperation(String op, FileSystem fs, Path nameNodePath, Path path,
            Map<String, String> argsMap, boolean dirFiles, boolean recursive, boolean isRoot)
            throws ActionExecutorException {

        try {
            FileStatus pathStatus = fs.getFileStatus(path);
            List<Path> paths = new ArrayList<Path>();

            if (dirFiles && pathStatus.isDirectory()) {
                if (isRoot) {
                    paths.add(path);
                }
                FileStatus[] filesStatus = fs.listStatus(path);
                for (int i = 0; i < filesStatus.length; i++) {
                    Path p = filesStatus[i].getPath();
                    paths.add(p);
                    if (recursive && filesStatus[i].isDirectory()) {
                        recursiveFsOperation(op, fs, null, p, argsMap, dirFiles, recursive, false);
                    }
                }
            }
            else {
                paths.add(path);
            }
            for (Path p : paths) {
                doFsOperation(op, fs, p, argsMap);
            }
        }
        catch (Exception ex) {
            throw convertException(ex);
        }
    }

    private void doFsOperation(String op, FileSystem fs, Path p, Map<String, String> argsMap)
            throws ActionExecutorException, IOException {
        if (op.equals(FS_OP_CHMOD)) {
            String permissions = argsMap.get(FS_TAG_PERMISSIONS);
            FsPermission newFsPermission = createShortPermission(permissions, p);
            fs.setPermission(p, newFsPermission);
        }
        else if (op.equals(FS_OP_CHGRP)) {
            String user = argsMap.get("user");
            String group = argsMap.get("group");
            fs.setOwner(p, user, group);
        }
    }

    /**
     * @param path file path
     * @param context executor context
     * @param fsConf file system configuration
     * @return FileSystem
     * @throws HadoopAccessorException if FS is not accessible
     */
    private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException {
        String user = context.getWorkflow().getUser();
        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
        Configuration conf = has.createConfiguration(path.toUri().getAuthority());
        XConfiguration.copy(context.getProtoActionConf(), conf);
        if (fsConf != null) {
            XConfiguration.copy(fsConf, conf);
        }
        return has.createFileSystem(user, path.toUri(), conf);
    }

    /**
     * @param path file path
     * @param user user
     * @return FileSystem
     * @throws HadoopAccessorException if FS is not accessible
     */
    private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException {
        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
        Configuration jobConf = has.createConfiguration(path.toUri().getAuthority());
        return has.createFileSystem(user, path.toUri(), jobConf);
    }

    void mkdir(Context context, Path path) throws ActionExecutorException {
        mkdir(context, null, null, path);
    }

    void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
        LOG.info("Creating directory [{0}]", path);
        try {
            path = resolveToFullPath(nameNodePath, path, true);
            FileSystem fs = getFileSystemFor(path, context, fsConf);

            if (!fs.exists(path)) {
                if (!fs.mkdirs(path)) {
                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
                                                      "mkdir, path [{0}] could not create directory", path);
                }
            } else {
                LOG.info("[{0}] already exist, no need for creation", path);
            }
        }
        catch (Exception ex) {
            throw convertException(ex);
        }
    }

    /**
     * Delete path
     *
     * @param context executor context
     * @param path file path
     * @throws ActionExecutorException if FS error occurs
     */
    public void delete(Context context, Path path) throws ActionExecutorException {
        delete(context, null, null, path, true);
    }

    /**
     * Delete path
     *
     * @param context executor context
     * @param fsConf file system configuration
     * @param nameNodePath configured name node path
     * @param path file path
     * @param skipTrash flag to skip the trash.
     * @throws ActionExecutorException if FS error occurs
     */
    public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path, boolean skipTrash)
            throws ActionExecutorException {
        LOG.info("Deleting [{0}]. Skipping trash: [{1}]", path, skipTrash);
        URI uri = path.toUri();
        URIHandler handler;
        org.apache.oozie.dependency.URIHandler.Context hcatContext = null;
        try {
            handler = Services.get().get(URIHandlerService.class).getURIHandler(uri);
            if (handler instanceof FSURIHandler) {
                // Use legacy code to handle hdfs partition deletion
                path = resolveToFullPath(nameNodePath, path, true);
                final FileSystem fs = getFileSystemFor(path, context, fsConf);
                Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
                if (pathArr != null && pathArr.length > 0) {
                    checkGlobMax(pathArr);
                    for (final Path p : pathArr) {
                        if (fs.exists(p)) {
                            if (!skipTrash) {
                                // Moving directory/file to trash of user.
                                UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
                                UserGroupInformation ugi = ugiService.getProxyUser(fs.getConf().get(OozieClient.USER_NAME));
                                ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
                                    @Override
                                    public FileSystem run() throws Exception {
                                        Trash trash = new Trash(fs.getConf());
                                        if (!trash.moveToTrash(p)) {
                                            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
                                                    "Could not move path [{0}] to trash on delete", p);
                                        }
                                        return null;
                                    }
                                });
                            }
                            else if (!fs.delete(p, true)) {
                                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
                                        "delete, path [{0}] could not delete path", p);
                            }
                        }
                    }
                }
            } else {
                hcatContext = handler.getContext(uri, fsConf, context.getWorkflow().getUser(), false);
                handler.delete(uri, hcatContext);
            }
        }
        catch (Exception ex) {
            throw convertException(ex);
        }
        finally{
            if (hcatContext != null) {
                hcatContext.destroy();
            }
        }
    }

    /**
     * Delete path
     *
     * @param user user
     * @param group group
     * @param path file path
     * @throws ActionExecutorException if FS error occurs
     */
    public void delete(String user, String group, Path path) throws ActionExecutorException {
        try {
            validatePath(path, true);
            FileSystem fs = getFileSystemFor(path, user);

            if (fs.exists(path)) {
                if (!fs.delete(path, true)) {
                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
                            "delete, path [{0}] could not delete path", path);
                }
            }
        }
        catch (Exception ex) {
            throw convertException(ex);
        }
    }

    /**
     * Move source to target
     *
     * @param context executor context
     * @param source source path
     * @param target target path
     * @param recovery set to true if in recovery mode
     * @throws ActionExecutorException if FS error occurs
     */
    public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
        move(context, null, null, source, target, recovery);
    }

    /**
     * Move source to target
     *
     * @param context executor context
     * @param fsConf file system configuration
     * @param nameNodePath configured name node path
     * @param source source path
     * @param target target path
     * @param recovery set to true if in recovery mode
     * @throws ActionExecutorException if FS error occurs
     */
    public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery)
            throws ActionExecutorException {
        LOG.info("Moving [{0}] to [{1}]", source, target);
        try {
            source = resolveToFullPath(nameNodePath, source, true);
            validateSameNN(source, target);
            FileSystem fs = getFileSystemFor(source, context, fsConf);
            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(source));
            if (( pathArr == null || pathArr.length == 0 ) ){
                if (!recovery) {
                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
                        "move, source path [{0}] does not exist", source);
                } else {
                    return;
                }
            }
            if (pathArr.length > 1 && (!fs.exists(target) || fs.isFile(target))) {
                if(!recovery) {
                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS012",
                            "move, could not rename multiple sources to the same target name");
                } else {
                    return;
                }
            }
            checkGlobMax(pathArr);
            for (Path p : pathArr) {
                if (!fs.rename(p, target) && !recovery) {
                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
                            "move, could not move [{0}] to [{1}]", p, target);
                }
            }
        }
        catch (Exception ex) {
            throw convertException(ex);
        }
    }

    void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException {
        chmod(context, null, null, path, permissions, dirFiles, recursive);
    }

    void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions,
            boolean dirFiles, boolean recursive) throws ActionExecutorException {

        LOG.info("Setting permissions [{0}] on [{1}]. Recursive mode: [{2}]", permissions, path, recursive);
        HashMap<String, String> argsMap = new HashMap<String, String>();
        argsMap.put(FS_TAG_PERMISSIONS, permissions);
        try {
            FileSystem fs = getFileSystemFor(path, context, fsConf);
            path = resolveToFullPath(nameNodePath, path, true);
            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
            if (pathArr == null || pathArr.length == 0) {
                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", "chmod"
                        + ", path(s) that matches [{0}] does not exist", path);
            }
            checkGlobMax(pathArr);
            for (Path p : pathArr) {
                recursiveFsOperation(FS_OP_CHMOD, fs, nameNodePath, p, argsMap, dirFiles, recursive, true);
            }

        }
        catch (Exception ex) {
            throw convertException(ex);
        }
    }

    void touchz(Context context, Path path) throws ActionExecutorException {
        touchz(context, null, null, path);
    }

    void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {

        LOG.info ("Performing touch on [{0}]", path);
        try {
            path = resolveToFullPath(nameNodePath, path, true);
            FileSystem fs = getFileSystemFor(path, context, fsConf);

            FileStatus st;
            if (fs.exists(path)) {
                st = fs.getFileStatus(path);
                if (st.isDirectory()) {
                    throw new Exception(path.toString() + " is a directory");
                } else if (st.getLen() != 0) {
                    throw new Exception(path.toString() + " must be a zero-length file");
                }
            }
            FSDataOutputStream out = fs.create(path);
            out.close();
        }
        catch (Exception ex) {
            throw convertException(ex);
        }
    }

    FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
        if (permissions.length() == 3) {
            char user = permissions.charAt(0);
            char group = permissions.charAt(1);
            char other = permissions.charAt(2);
            int useri = user - '0';
            int groupi = group - '0';
            int otheri = other - '0';
            int mask = useri * 100 + groupi * 10 + otheri;
            short omask = Short.parseShort(Integer.toString(mask), 8);
            return new FsPermission(omask);
        }
        else {
            if (permissions.length() == 10) {
                return FsPermission.valueOf(permissions);
            }
            else {
                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
                                                  "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
            }
        }
    }

    @Override
    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
    }

    @Override
    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
    }

    @Override
    public void start(Context context, WorkflowAction action) throws ActionExecutorException {
        LOG.info("Starting action");
        try {
            context.setStartData("-", "-", "-");
            Element actionXml = XmlUtils.parseXml(action.getConf());
            doOperations(context, actionXml);
            context.setExecutionData("OK", null);
        }
        catch (Exception ex) {
            throw convertException(ex);
        }
    }

    @Override
    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
        String externalStatus = action.getExternalStatus();
        WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
                                       WorkflowAction.Status.ERROR;
        context.setEndData(status, getActionSignal(status));
        if (!context.getProtoActionConf().getBoolean(WorkflowXCommand.KEEP_WF_ACTION_DIR, false)) {
            try {
                FileSystem fs = context.getAppFileSystem();
                fs.delete(context.getActionDir(), true);
            }
            catch (Exception ex) {
                throw convertException(ex);
            }
        }
        LOG.info("Action ended with external status [{0}]", action.getExternalStatus());
    }

    @Override
    public boolean isCompleted(String externalStatus) {
        return true;
    }

    /**
     * @param context executor context
     * @return Path returns recovery path
     * @throws HadoopAccessorException if accessing file system fails
     * @throws IOException in case of IO error
     * @throws URISyntaxException if the processed uri is not a valid URI
     */
    public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
        return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
    }

    private void checkGlobMax(Path[] pathArr) throws ActionExecutorException {
        if(pathArr.length > maxGlobCount) {
            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS013",
                    "too many globbed files/dirs to do FS operation");
        }
    }
    void setrep(Context context, Path path, short replicationFactor)
            throws ActionExecutorException, HadoopAccessorException {
        LOG.info("Setting replication factor: [{0}] for [{1}]", replicationFactor, path);
        try {
            path = resolveToFullPath(null, path, true);
            FileSystem fs = getFileSystemFor(path, context, null);

            if (fs.isFile(path)) {
                fs.setReplication(path, replicationFactor);
            }
        } catch (IOException ex) {
            convertException(ex);
        }
    }

    public boolean supportsConfigurationJobXML() {
        return true;
    }
}
